c45e11cfba168e1544e97b86d6d0782109ef8be4,src/main/java/com/couchbase/client/dcp/conductor/Conductor.java,Conductor,startStreamForPartition,#number#number#number#number#number#number#,160
Before Change
public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
DcpChannel channel = masterChannelByPartition(partition);
if (channel.state() != LifecycleState.CONNECTED) {
LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
return Observable
.timer(100, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(Long aLong) {
return startStreamForPartition(partition, vbuuid, startSeqno, endSeqno,
snapshotStartSeqno, snapshotEndSeqno).toObservable();
}
}).toCompletable();
}
return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
}
public Completable stopStreamForPartition(final short partition) {
After Change
.toObservable();
}
})
.retryWhen(anyOf(NotConnectedException.class)
.delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
.doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
@Override
public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
}
})
.build()
)
.toCompletable();
}